package com.gitee.jastee.kafka.standalone;

import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

/**
 * 有一些需求，需要指定一个消费者消费某一个分区。彼此之间不干扰，一个standalone consumer崩溃不会影响其他
 * 并且不会产生 Rebalance
 * @author jast
 * @date 2020/4/28 15:03
 */
public class StandaloneConsumer {
static final  Log log = LogFactory.get();

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<TopicPartition> partitions = new ArrayList<>();
        List<PartitionInfo> allpartitions = consumer.partitionsFor("test");
        if(allpartitions!=null && !allpartitions.isEmpty()){
            for(PartitionInfo partitionInfo:allpartitions){
                partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
            }
            log.info("partition:{}",partitions);
            //消费所有Partition
            consumer.assign(partitions);
        }
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s , partition = %s%n", record.offset(), record.key(), record.value(),record.partition());
            }
        }


    }

}
